/* Copyright (c) 2022-2022, LiWangQian<liwangqian@huawei.com> All rights reserved.
 *
 * Redistribution and use in source and binary forms, with or without modification,
 * are permitted provided that the following conditions are met:
 *
 * 1. Redistributions of source code must retain the above copyright notice, this list of
 *    conditions and the following disclaimer.
 *
 * 2. Redistributions in binary form must reproduce the above copyright notice, this list
 *    of conditions and the following disclaimer in the documentation and/or other materials
 *    provided with the distribution.
 *
 * 3. Neither the name of the copyright holder nor the names of its contributors may be used
 *    to endorse or promote products derived from this software without specific prior written
 *    permission.
 *
 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
 * THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
 * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR
 * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
 * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
 * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS;
 * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
 * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR
 * OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
 * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
 */
#ifndef LIBSIM_ASYNC_FILTER_H_INCLUDED
#define LIBSIM_ASYNC_FILTER_H_INCLUDED

#include "logpp/configs.h"
#include "logpp/buffers/circular_buffer.h"
#include "logpp/filters/ifilter.h"
#include "logpp/utils/memory.h"

#include <atomic>
#include <condition_variable>
#include <mutex>
#include <thread>
#include <iostream>

LOGPP_NS_BEGIN

template <typename FilterImpl = ifilter, typename Sink = isink>
class async_filter : public ifilter {
public:
    using this_type = async_filter<FilterImpl, Sink>;
    using filter_type = FilterImpl;
    using sink_type = Sink;

    ~async_filter()
    {
        stop();
        print_statistic_info();
        delete_object(worker_);
        delete_object(filter_);
        delete_object(sink_);
    }

    async_filter(filter_type *impl, sink_type *sink)
        : filter_{impl}
        , sink_{sink}
    {
    }

    bool filter(message &m) noexcept override
    {
        while (is_running() && !try_cache_message(m)) { /* NOPS */ }
        return true;
    }

    bool is_running() const noexcept
    {
        return worker_ != nullptr && !stop_;
    }

    void start() noexcept
    {
        try {
            stop_ = false;
            worker_ = new_object<std::thread>(&async_filter::message_loop, this);
            if (worker_ == nullptr) stop_ = true;
        } catch (...) {
            stop_ = true;
            delete_object(worker_);
            worker_ = nullptr;
        }
    }

    void stop() noexcept
    {
        stop_ = true;
        cv_.notify_all();
        if (worker_ && worker_->joinable()) {
            worker_->join();
        }
    }

private:
    bool try_cache_message(message &m) noexcept
    {
        std::unique_lock<std::mutex> lock{mtx_};
        bool pushed = cached_.try_push_swap(m);
        if (pushed) cv_.notify_all();
        return pushed;
    }

    void message_loop() noexcept
    {
        message m; /* 避免在循环中创建message对象 */
        while (!stop_.load()) {
            do {
                wait_messages();
                filter_messages(m);
            } while (!cached_.empty()); // 确保stop值后再cache中的消息都需要打出去
        }
    }

    void wait_messages() noexcept
    {
        std::unique_lock<std::mutex> lock{mtx_};
        cv_.wait(lock, [this]() { return !cached_.empty() || stop_; });
        cached_.swap(running_);
    }

    void filter_messages(message &m) noexcept
    {
        auto cnt = running_.size();
        if (cnt == 0) return;

        auto start = time::clock::now();

        sink_->open();
        while (running_.try_pop_swap(m)) {
            if (!filter_->filter(m)) {
                sink_->output(m);
            }
        }
        sink_->flush();
        sink_->close();

        auto stop = time::clock::now();
        total_filter_ += cnt;
        time_ns_ += (stop - start).count();
    }

    void print_statistic_info() noexcept
    {
        std::cout << "[async-filter] messages count: " << total_filter_ << ", elapse: "
                  << time_ns_ << " ns"
                  << std::endl;
    }

    std::mutex mtx_;
    std::condition_variable cv_;
    circular_buffer<message> cached_;
    circular_buffer<message> running_;
    filter_type *filter_{nullptr};
    sink_type *sink_{nullptr};
    std::thread *worker_{nullptr};
    std::atomic_bool stop_{true};

    // debug
    size_t total_filter_{0};
    uint64_t time_ns_{0};
};

LOGPP_NS_END

#endif /* LIBSIM_ASYNC_FILTER_H_INCLUDED */
